Skip to content

[Config Refactor][1/N] Model Pipeline Configuration System#1115

Open
lishunyang12 wants to merge 6 commits intovllm-project:mainfrom
lishunyang12:config_ref
Open

[Config Refactor][1/N] Model Pipeline Configuration System#1115
lishunyang12 wants to merge 6 commits intovllm-project:mainfrom
lishunyang12:config_ref

Conversation

@lishunyang12
Copy link
Contributor

@lishunyang12 lishunyang12 commented Jan 30, 2026

Summary

Multi-stage omni models (Qwen3-Omni, Qwen2.5-Omni, Bagel, Qwen3-TTS) currently define their pipeline structure and runtime knobs in a single flat YAML. This makes it unclear which params are structural (stage ordering, worker types, input edges) vs. user-tunable (gpu_memory_utilization, tp_size, devices), and forces users to understand internal details just to change a runtime knob.

This PR introduces a model pipeline configuration system that cleanly separates the two concerns:

Model Pipeline Config CLI Overrides
Owner Model developer End user
Set via model_pipelines/<model>/pipeline.yaml (bundled) CLI args / API params
Examples stage ordering, input_sources, worker_type, custom hooks, final_output_type gpu_memory_utilization, tp_size, devices, max_batch_size, enforce_eager
Mutability Fixed at integration time Per-run

This is the foundation PR ([1/N]) -- it adds the data model, pipeline files, OmegaConf isolation, and unit tests. The factory (StageConfigFactory.create_from_model()) is defined but not yet wired into the engine startup path; that wiring is [2/N].

Directory structure

Each model type gets its own directory under vllm_omni/model_pipelines/:

vllm_omni/model_pipelines/
  __init__.py                          # PIPELINES_DIR, get_pipeline_path()
  bagel/
    pipeline.yaml                      # Pipeline structure (stages, types, data-flow)
    default_args.yaml                  # Default runtime/engine args, sampling params
  qwen2_5_omni/
    pipeline.yaml
    default_args.yaml
  qwen3_omni_moe/
    pipeline.yaml
    default_args.yaml
  qwen3_tts/
    pipeline.yaml
    default_args.yaml

How different model types are configured

Model type Config source Stages Example
Multi-stage LLM Pipeline YAML (auto-detected) 2-3 Qwen3-Omni (Thinker -> Talker -> Code2Wav)
Multi-stage LLM+Diffusion Pipeline YAML (auto-detected) 2+ Bagel (Thinker -> DiT via SharedMemoryConnector)
Single-stage diffusion No YAML -- create_default_diffusion() fallback 1 Standalone image generation models

For single-stage diffusion models, when no pipeline YAML is found, the orchestrator automatically falls back to StageConfigFactory.create_default_diffusion() which programmatically creates a minimal config (stage_type=diffusion, final_output=image). No YAML authoring needed.

Key classes

Class Location Purpose
StageType config/stage_config.py Enum: LLM or DIFFUSION
StageConfig config/stage_config.py Dataclass for one pipeline stage -- identity, edges, output config, worker config, runtime overrides
ModelPipeline config/stage_config.py Container for a model pipeline graph + validate_pipeline() for integration-time validation
StageConfigFactory config/stage_config.py Entry point: auto-detects model type, loads pipeline YAML, merges CLI overrides
yaml_util wrappers config/yaml_util.py create_config, load_yaml_raw, merge_configs, to_dict -- isolates all OmegaConf usage to one file

Models with bundled pipelines

Model Pipeline directory Stages Output types
Qwen3-Omni-MoE qwen3_omni_moe/ 3 (Thinker -> Talker -> Code2Wav) text + audio
Qwen2.5-Omni qwen2_5_omni/ 3 (Thinker -> Talker -> Code2Wav) text + audio
Bagel bagel/ 2 (Thinker -> DiT) text + image
Qwen3-TTS qwen3_tts/ 2 (Qwen3-TTS -> Code2Wav) audio

Benefits

For users: no more YAML editing

Before -- to change gpu_memory_utilization, users had to find and edit the legacy YAML, which mixes topology with runtime params in 100+ lines.

After -- users pass CLI args. Pipeline is auto-detected and auto-loaded:

# Change gpu_memory_utilization for all stages:
python end2end.py --gpu-memory-utilization 0.9

# Override a specific stage only:
python end2end.py --gpu-memory-utilization 0.9 --stage-1-gpu-memory-utilization 0.6

# Multiple overrides:
python end2end.py --gpu-memory-utilization 0.9 --tensor-parallel-size 2 --enforce-eager

For model developers: adding a new model is minimal

Step What to do
1 Create model_pipelines/new_model/pipeline.yaml
2 Create model_pipelines/new_model/default_args.yaml
3 Add one line in StageConfigFactory.PIPELINE_DIRS
4 Done -- all CLI overrides work automatically via _merge_cli_overrides()

For maintainers: OmegaConf is now isolated

Before After
OmegaConf.load() scattered across omni.py, utils.py, stage_utils.py All calls routed through yaml_util.py
Migrating away from OmegaConf requires touching every file Change one file

Architecture

Pipeline structures supported

Qwen3-Omni / Qwen2.5-Omni (3-stage, text+audio output):

  [Thinker (LLM/AR)] --> [Talker (LLM/AR)] --> [Code2Wav (LLM/gen)]
   final:text                                    final:audio


Bagel (2-stage, text+image output):

  [Thinker (LLM/AR)] --> [DiT (diffusion)]
   final:text              final:image
                         (SharedMemoryConnector)


Qwen3-TTS (2-stage, audio output):

  [Qwen3-TTS (LLM/AR)] --> [Code2Wav (LLM/gen)]
                             final:audio


Single-stage diffusion (image generation, no YAML):

  [Diffusion]  <-- auto-created by create_default_diffusion()
   final:image

Override precedence

Model Pipeline Config (bundled, read-only)
  |
  v
Global CLI args (--gpu-memory-utilization 0.8)  --> apply to ALL stages
  |
  v
Per-stage CLI args (--stage-1-gpu-memory-utilization 0.6)  --> override specific stage
  |
  v
Final StageConfig.runtime_overrides dict

Migration path

                    resolve_model_config_path
          legacy -->  load_stage_configs_from_*  --> flat OmegaConf dicts
         (current)    (deprecated, utils.py)

                    StageConfigFactory
           new  -->   .create_from_model()      --> list[StageConfig]
         ([2/N])      (stage_config.py)

Both paths coexist during the migration. Once [2/N] wires StageConfigFactory into engine startup, the legacy path can be removed.

Testing

Environment: 2x NVIDIA A100-SXM4-80GB, PyTorch 2.10.0+cu128, CUDA 12.8, Python 3.12.3, vLLM 0.16.0

Unit tests

pytest tests/test_config_factory.py -v  ->  30/30 PASSED

Covers: StageType, StageConfig, ModelPipeline, StageConfigFactory, CLI override merging, YAML pipeline parsing, validation, and edge cases.

End-to-end inference (all bundled pipelines)

Model Command Result
Qwen3-TTS (1.7B, 1 GPU) end2end.py --query-type CustomVoice PASSED
Qwen3-TTS (1.7B, 1 GPU) end2end.py --query-type VoiceDesign PASSED
Bagel (7B, 2 GPUs) end2end.py --modality text2img PASSED
Qwen2.5-Omni (7B, 2 GPUs) end2end.py PASSED
Qwen3-Omni (30B, 2 GPUs) end2end.py (default) PASSED

CLI override tests (Qwen3-Omni)

CLI args Result
--gpu-memory-utilization 0.85 --enforce-eager PASSED
--output-dir /tmp/pr1115_test_output PASSED

@lishunyang12 lishunyang12 changed the title [Core] Two-Tier Stage Configuration System - Phase 1: Core Infrastructure [Core] Two-Tier Stage Configuration System - [1/N] Core Infrastructure Jan 30, 2026
Copy link

@chatgpt-codex-connector chatgpt-codex-connector bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: 5f718c777c

ℹ️ About Codex in GitHub

Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".

Comment on lines +205 to +207
# Filter stages if --stage-id is specified (for independent launch)
if stage_id_filter is not None:
filtered_configs = [cfg for cfg in self.stage_configs if getattr(cfg, "stage_id", None) == stage_id_filter]

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Preserve stage IDs when filtering configs

Filtering down to a single stage_id here reindexes self.stage_configs from 0, but downstream _start_stages still uses enumerate(self.stage_list) to pick the stage_id passed into get_stage_connector_config/resolve_omni_kv_config_for_stage. In an independent launch like --stage-id 1, the only stage will be treated as stage 0 for connector wiring, so it will subscribe to the wrong edges and never receive upstream payloads. Consider keeping original stage IDs or introducing an index→stage_id mapping for connector setup and scheduling in independent stage mode.

Useful? React with 👍 / 👎.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a NOTE comment explaining this is intentional — in independent
launch mode the stage runs in isolation without cross-stage connectors, so
list index 0 is correct.

@lishunyang12 lishunyang12 changed the title [Core] Two-Tier Stage Configuration System - [1/N] Core Infrastructure [Refactor] Two-Tier Stage Configuration System - [1/N] Core Infrastructure Jan 30, 2026
@lishunyang12 lishunyang12 changed the title [Refactor] Two-Tier Stage Configuration System - [1/N] Core Infrastructure [Config Refactor][1/N] Two-Tier Stage Configuration & Independent Service Launch Jan 30, 2026
@lishunyang12 lishunyang12 changed the title [Config Refactor][1/N] Two-Tier Stage Configuration & Independent Service Launch [WIP][Config Refactor][1/N] Two-Tier Stage Configuration & Independent Service Launch Jan 30, 2026
Copy link
Contributor

@xiedeyantu xiedeyantu left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for your excellent work. I have a few questions.

batch_timeout = kwargs.get("batch_timeout", 10)
stage_configs_path = kwargs.get("stage_configs_path", None)
log_stats = kwargs.get("log_stats", False)
stage_id_filter = kwargs.get("stage_id", None) # For independent stage launch
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe keep stage_id is better?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed, renamed to stage_id

final_output: Whether this stage produces final output.
final_output_type: Type of final output ("text", "audio", "image").
worker_type: Worker type ("ar" or "generation").
scheduler_cls: Full module path to scheduler class.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have a small question: will there be multiple cls values? Which one will be used by default if I don't specify it? Could you add a comment to explain this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a comment. When None (default), the engine uses its built-in
scheduler. Model developers can specify a custom class (e.g.
vllm_omni.core.sched.omni_ar_scheduler.OmniARScheduler) to override
scheduling for a particular stage

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we remove the scheduler_cls or the make the default shorter: OmniARScheduler/OmniGenerationScheduler...., I do not remember why we need to keep vllm_omni.core.sched.omni_ar_scheduler.OmniARScheduler?

Copy link
Contributor Author

@lishunyang12 lishunyang12 Feb 23, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The full path is needed because scheduler_cls gets dynamically imported at runtime (importlib.import_module in omni_stage.py), so it has to be a resolvable module path. We could add a short-name registry that maps e.g. OmniARScheduler -> vllm_omni.core.sched.omni_ar_scheduler.OmniARScheduler, but that adds another lookup table to maintain. I would lean toward keeping the full path for now since it is explicit and only lives in the topology YAML (not user-facing). But happy to add the registry if you prefer — would be a small change.

# Build full config dict
config_dict: dict[str, Any] = {
"stage_id": self.stage_id,
"stage_type": self.stage_type.value if isinstance(self.stage_type, StageType) else self.stage_type,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it better to use an enumeration here or not? If we use an enumeration, should the else block throw an error?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch. Changed to StageType(self.stage_type).value which enforces the
enum and raises ValueError on invalid values

"stage_type": self.stage_type.value if isinstance(self.stage_type, StageType) else self.stage_type,
"engine_args": OmegaConf.create(engine_args),
"runtime": OmegaConf.create(runtime),
"engine_input_source": self.input_sources, # Legacy field name
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will input_sources use the values ​​from edges subsequently?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a comment. For now input_sources is the simple upstream ID list. In
the future it may be derived from StageTopology.edges for richer edge
metadata (data format, buffering policy).

@@ -0,0 +1,416 @@
# SPDX-License-Identifier: Apache-2.0
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we also add tests for stage yaml parsing?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make sense, I will add it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added TestTopologyYamlParsing with 5 tests: full qwen3_omni_moe parsing,
legacy engine_input_source backward compat, connectors/edges, validation
pass-through, and diffusion stage type

connectors: dict[str, Any] | None = None
edges: list[dict[str, Any]] | None = None

def validate_dag(self) -> list[str]:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't get the point why we need to check cycle dependency. It looks like it will add extra cost but almost no benefit. Anything I miss? Because when we support one model, it means that the topology will be valid and static. We wouldn't need to check or update it anymore.

Copy link
Contributor Author

@lishunyang12 lishunyang12 Feb 1, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right. This would indeed add unnecessary workload. Now that you've pointed it out, I understand the issue. As someone new to testing, I'll revert the change and will be more mindful of this in future PRs.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed _detect_cycles(). Topology is static and validated at integration
time, so cycle detection adds cost with no benefit.

@lishunyang12 lishunyang12 force-pushed the config_ref branch 4 times, most recently from f8b0ba0 to f7e7639 Compare February 9, 2026 20:20
Copy link
Contributor

@wuhang2014 wuhang2014 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Big step forward of refactor. I have a suggestion here: we can use dataclass for configuration in omni.py and omni_stage.py instead of dict.

Two-Tier Stage Configuration System for vLLM-Omni.

Design Principles:
- Tier-1 (DAG Topology): INTERNAL ONLY - set by model developers at integration time
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We may just naming it pipeline configuration or model pipeline configuration. DAG is one of implementations other than an abstraction.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. Renamed all "DAG Topology" references to "Pipeline Topology" throughout docstrings and comments. Also
renamed validate_dag() → validate_topology() for consistency

Comment on lines +372 to +376
topology_path = PROJECT_ROOT / "vllm_omni" / "model_executor" / "stage_topologies" / topology_file

if not topology_path.exists():
logger.debug(f"Topology file not found: {topology_path}")
return None
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any other method that can be more elegant to implement this? Maybe we can create a python module in topo dir, and just import the module to involve the config file. Or just move class StageConfigFactory to topo dir.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed — the hardcoded PROJECT_ROOT path was fragile. Created stage_topologies/init.py as a proper Python package
that exposes get_topology_path(filename). Now stage_config.py just does from vllm_omni.model_executor.stage_topologies
import get_topology_path and calls get_topology_path(topology_file) — no more PROJECT_ROOT computation.

Comment on lines +253 to +263
runtime_params = {
"devices",
"gpu_memory_utilization",
"tensor_parallel_size",
"enforce_eager",
"max_num_batched_tokens",
"trust_remote_code",
"max_batch_size",
"distributed_executor_backend",
"enable_prefix_caching",
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can other cli args be override ? In my opinion, all parameters from cli that registered by engines can be used to construct EngineArgs

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right. Changed from a hardcoded allowlist to a blocklist approach: _INTERNAL_KEYS lists the orchestrator-only
keys that should not be forwarded (model, stage_configs_path, batch_timeout, etc.). Everything else is passed through,
so any engine-registered CLI arg works out of the box. The old RUNTIME_PARAMS set is kept as documentation but no
longer gates forwarding

# TODO: hack, convert dtype to string to avoid non-premitive omegaconf create error.
if "dtype" in kwargs:
kwargs["dtype"] = str(kwargs["dtype"])
def _create_default_diffusion_stage_cfg(self, kwargs: dict[str, Any]) -> list[dict[str, Any]]:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm thinking if we can define the stage config in python dataclass. Because dict is not easy to read when reading the code, and checks cannot be applied to dict too.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed. create_default_diffusion() now builds a typed StageConfig dataclass first (with validation), then converts to
the legacy OmegaConf dict at the boundary for backward compatibility with OmniStage. This gives us type safety at
creation time while keeping the existing downstream contract intact. Full migration of OmniStage to consume
StageConfig directly is planned for a later phase.

@lishunyang12 lishunyang12 force-pushed the config_ref branch 2 times, most recently from 71008fc to 4c39c90 Compare February 11, 2026 16:07
Copy link
Contributor Author

@lishunyang12 lishunyang12 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

have some concerns around the independent stage launch path

@hsliuustc0106
Copy link
Collaborator

can we make to in v0.16.0 stable release? this is going to be essential for improving user experience

@hsliuustc0106
Copy link
Collaborator

@tzhouam @Gaohan123 @fake0fan please review this PR in detail

@lishunyang12 lishunyang12 force-pushed the config_ref branch 2 times, most recently from c615016 to 35f8a59 Compare February 23, 2026 02:12
@lishunyang12
Copy link
Contributor Author

lishunyang12 commented Feb 23, 2026

Yes, targeting v0.16.0. Main dependency is #939 — once that merges I will rebase to hook into load_and_resolve_stage_configs() and remove the [WIP] tag.

@lishunyang12 lishunyang12 force-pushed the config_ref branch 4 times, most recently from 76a7185 to 1aa4640 Compare February 23, 2026 03:26
@lishunyang12
Copy link
Contributor Author

This PR is ready for review.


# Mapping of model types to architecture classes
ARCH_MAPPING: dict[str, str] = {
"qwen3_omni_moe": "Qwen3OmniMoeForConditionalGeneration",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please consider whether it's necessary to create the model_type mapping here. From the current Qwen3-TTS implementation, we need to add model_arch: Qwen3TTSCode2Wav in stage 1. Not sure whether it's a bug of Qwen3-TTS. Or should we directly reuse vLLM's parameter model_arch and make the selection of each stage less restrictive.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed — a model-level arch mapping can't handle per-stage differences (e.g., Qwen3-TTS code2wav needs Qwen3TTSCode2Wav, distinct from stage 0). Removed both ARCH_MAPPING and _auto_detect_model_arch since they're dead code in [1/N] anyway. When this gets wired up in [2/N], it should be per-stage model_arch in the topology YAML.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The model_type mapping (now PIPELINE_MODELS) is how we map HF config model_type → pipeline YAML directory. For model_arch in code2wav — that should go in default_args.yaml as a per-stage engine arg, not in the pipeline topology. Will check if the Qwen3-TTS impl needs a fix for this.

Copy link
Contributor

@fake0fan fake0fan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

left some comments. TBH I'm still considering whether to include Independent Service Launch in this PR.

]
topology = StageTopology(model_type="test", stages=stages)
errors = topology.validate_topology()
assert any("entry point" in e.lower() for e in errors)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

test_mutual_dependency_detected_as_missing_entry and test_missing_entry_point are duplicate.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, same setup and assertion — removed it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch, removed the duplicate test.

Comment on lines +506 to +517
parser.add_argument(
"--gpu-memory-utilization",
type=float,
default=None,
help="GPU memory utilization for all stages (Tier-2 override). Example: 0.9",
)
parser.add_argument(
"--tensor-parallel-size",
type=int,
default=None,
help="Tensor parallel size for all stages (Tier-2 override). Example: 2",
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So if these parameters are set individually now, it will apply to all stages?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right — global CLI params like --gpu-memory-utilization apply to every stage by default. Per-stage overrides (--stage-N-*) take precedence when specified. Added a comment at the kwargs block to clarify this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, global args like --gpu-memory-utilization apply to all stages. Per-stage overrides (--stage-N-*) take precedence when specified — this will be wired up in [2/N].

Comment on lines +501 to +543
def _merge_cli_overrides(
cls,
stage: StageConfig,
cli_overrides: dict[str, Any],
) -> dict[str, Any]:
"""Merge CLI overrides into stage runtime config.

All CLI arguments registered by engine config classes (e.g.
EngineArgs / OmniDiffusionConfig) are accepted as overrides,
not just the well-known ``RUNTIME_PARAMS`` set.

Handles:
- Global overrides (apply to all stages)
- Per-stage overrides (--stage-N-* format, take precedence)

Args:
stage: The stage to merge overrides into.
cli_overrides: CLI arguments from VllmConfig/OmniDiffusionConfig.

Returns:
Dict of runtime overrides for this stage.
"""
result: dict[str, Any] = {}

# Apply global overrides – any key not in the internal blocklist
# is forwarded so that engine-registered params work out of the box.
for key, value in cli_overrides.items():
if key in cls._INTERNAL_KEYS:
continue
if key.startswith("stage_"):
# Per-stage keys handled below
continue
if value is not None:
result[key] = value

# Apply per-stage overrides (--stage-N-* format, take precedence)
stage_prefix = f"stage_{stage.stage_id}_"
for key, value in cli_overrides.items():
if key.startswith(stage_prefix) and value is not None:
param_name = key[len(stage_prefix) :]
result[param_name] = value

return result
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

RUNTIME_PARAMS is not used here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch, removed. Only _INTERNAL_KEYS is actually used by _merge_cli_overrides.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed in latest push. The blocklist approach in _INTERNAL_KEYS replaced it.

Comment on lines +385 to +407
def test_extract_basic_params(self):
"""Test extracting basic runtime parameters."""
from vllm_omni.entrypoints.utils import extract_runtime_overrides

kwargs = {
"gpu_memory_utilization": 0.9,
"tensor_parallel_size": 2,
"devices": "0,1",
"custom_engine_param": "forwarded",
"none_param": None, # Should be excluded
"model": "some_model", # Internal key, should be excluded
}

result = extract_runtime_overrides(kwargs)

assert result["gpu_memory_utilization"] == 0.9
assert result["tensor_parallel_size"] == 2
assert result["devices"] == "0,1"
# Non-internal, non-None params are forwarded (extensible overrides)
assert result["custom_engine_param"] == "forwarded"
assert "none_param" not in result
# Internal keys are excluded
assert "model" not in result
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since the detection below does not use RUNTIME_PARAMS, I think this test will fail.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The test itself passes (it's testing the blocklist behavior which works), but the comment was misleading. Fixed it to reference \ instead.

Comment on lines -180 to -190
# TODO: hack, calculate devices based on parallel config.
devices = "0"
if "parallel_config" in kwargs:
num_devices = kwargs["parallel_config"].world_size
for i in range(1, num_devices):
devices += f",{i}"
default_stage_cfg = [
{
"stage_id": 0,
"stage_type": "diffusion",
"runtime": {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we also remove the _create_default_diffusion_stage_cfg function from async omni?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looked into it — async_omni's version has extra diffusion-parallel logic (ulysses_degree, ring_degree, DiffusionParallelConfig) that isn't in the base version. Not a straightforward dedup. Deferring to [2/N] where we can unify both properly.

@lishunyang12
Copy link
Contributor Author

Good call on the independent service launch — I agree it doesn't belong here. #939 already adds --stage-id with the full distributed infra (ZMQ, --headless, master address), and our incomplete version would conflict with it. Removed the entire --stage-id / stage_id_filter path from this PR.

Also addressed the inline comments — thanks for the careful review.

@lishunyang12 lishunyang12 changed the title [WIP][Config Refactor][1/N] Two-Tier Stage Configuration & Independent Service Launch [Config Refactor][1/N] Two-Tier Stage Configuration System Feb 23, 2026
@lishunyang12 lishunyang12 force-pushed the config_ref branch 3 times, most recently from 2bc21e0 to fd9369a Compare February 23, 2026 10:47
Copy link
Collaborator

@princepride princepride left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have a question:
Some fields in the old files have been discarded in the new topology. However some default_sampling_params includes temperature, top_k, stop_token_ids, etc. These appear to be neither pure topology nor pure runtime parameters, but rather default values ​​bound to the model. async_chunk and engine_output_type are also inherent model attributes.
How are these fields planned to be handled during [2/N] integration? Will they be added back to the Tier-1 topology YAML, or are there other solutions?

]
topology = StageTopology(model_type="test", stages=stages)

stage = topology.get_stage(1)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems topology don't have method get_stage

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch, the method was missing. Added it.

assert stage is not None
assert stage.model_stage == "talker"

missing = topology.get_stage(99)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed, same commit.

Comment on lines +458 to +472
for key, value in cli_overrides.items():
if key in cls._INTERNAL_KEYS:
continue
if key.startswith("stage_"):
# Per-stage keys handled below
continue
if value is not None:
result[key] = value

# Apply per-stage overrides (--stage-N-* format, take precedence)
stage_prefix = f"stage_{stage.stage_id}_"
for key, value in cli_overrides.items():
if key.startswith(stage_prefix) and value is not None:
param_name = key[len(stage_prefix) :]
result[param_name] = value
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems the filter logics are not same. Why the second part don't skip cls._INTERNAL_KEYS

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right, that's inconsistent — a per-stage key like stage_0_model would bypass the blocklist. Fixed, added a test for it too.

@hsliuustc0106
Copy link
Collaborator

Review of [Config Refactor][1/N] Two-Tier Stage Configuration System

Overview

Foundation PR (1/N) that introduces a two-tier config system separating pipeline topology (Tier-1, developer-owned YAML) from runtime knobs (Tier-2, user CLI args). Adds the data model, topology files, OmegaConf isolation, and tests — but does not wire the new factory into the engine startup path yet.

+1307 / -52 across 14 files. Well-structured PR description with clear motivation.


Issues

1. Stale/misleading comment in end2end.py

# Import StageConfigFactory for Tier-2 CLI override testing
from vllm_omni.entrypoints.omni import Omni

The comment says "Import StageConfigFactory" but the actual import is Omni. Looks like a leftover from an earlier revision. Should be removed.

2. _merge_cli_overrides has a fragile stage_ prefix filter

In stage_config.py, the global override loop skips all keys starting with stage_:

if key.startswith("stage_"):
    # Per-stage keys handled below
    continue

This silently drops any future CLI arg that happens to start with stage_ but isn't a per-stage override (e.g., stage_init_timeout — currently saved by _INTERNAL_KEYS, but the two filters are independent). A more precise check like re.match(r"stage_\d+_", key) would be safer and wouldn't rely on _INTERNAL_KEYS being kept in sync.

3. Per-stage CLI overrides described but not wired

The PR description advertises --stage-1-gpu-memory-utilization 0.6 style overrides, and _merge_cli_overrides handles the stage_N_ prefix pattern — but no argparse entries are defined for per-stage args in end2end.py. Users can't actually pass these today. This should either be documented as "coming in [2/N]" or the argparse should be extended with a dynamic arg parser.

4. Suspicious hf_config_name in qwen3_omni_moe.yaml code2wav stage

  - stage_id: 2
    model_stage: code2wav
    ...
    hf_config_name: thinker_config  # <-- why thinker_config for code2wav?

The code2wav stage references thinker_config as its HF config name. This looks like a copy-paste error — the thinker stage (stage 0) also uses hf_config_name: thinker_config. The qwen2.5-omni topology doesn't set hf_config_name on its code2wav stage at all. Worth verifying this is intentional.

5. create_from_model returns empty list as a sentinel

if topology is None:
    # No topology found - return empty list (caller should use default diffusion)
    return []

An empty list as a "fall back to diffusion" signal is an implicit contract. A caller that doesn't know this convention could misinterpret [] as "no stages." Consider returning None instead, or raising a specific exception, to make the contract explicit.

6. _auto_detect_model_type unconditionally trusts remote code

hf_config = get_config(model, trust_remote_code=True)

This always passes trust_remote_code=True regardless of what the user specified. It should respect the user's --trust-remote-code flag, or at minimum document why it's hardcoded.

7. get_final_stage_id_for_e2e — subtle behavior change

The old code checked if final_stage_id_for_e2e < 0 after the loop. The new code initializes final_stage_id_for_e2e = last_stage_id before the loop. Functionally equivalent and arguably cleaner, but this is a behavioral change buried in a config refactor PR. If the variable was previously uninitialized when the loop found no match, this changes the error path. Should be called out in the PR description.


Minor Nits

  • bagel.yaml is the only topology with connectors and edges sections. The parsing code handles them, but test coverage for connector/edge parsing from YAML is thin — only the Bagel-specific test exercises it.
  • The enforce_eager / trust_remote_code checks in end2end.py use truthiness (if args.enforce_eager:) while other args use is not None. Correct for store_true args, but the asymmetry is worth a brief comment.
  • princepride's review question about default_sampling_params, async_chunk, and engine_output_type is valid — these fields exist in legacy YAMLs but have no home in the new Tier-1 schema. The [2/N] plan for these should be clarified.

What's Good

  • Clean separation of concerns between topology and runtime config is a real improvement over the monolithic YAML approach.
  • OmegaConf isolation into yaml_util.py is well done — makes future migration away from OmegaConf tractable.
  • Test coverage is solid: 462 lines covering enum values, dataclass construction, DAG validation (cycles, duplicates, missing sources), factory methods, CLI override merging, and YAML parsing.
  • The backward-compat path (to_omegaconf(), legacy field support in omni_stage.py) is thoughtful for a gradual migration.
  • Topology YAMLs are clean and minimal compared to the legacy format.

@lishunyang12
Copy link
Contributor Author

How are these fields planned to be handled during [2/N] integration?

Good question. Fields like default_sampling_params and engine_output_type are model-intrinsic defaults, so they'll go into Tier-1 YAML in [2/N] when we wire the factory into engine startup. async_chunk is similar. The plan is to add them as optional fields on StageConfig so topology files can declare them, while still allowing Tier-2 CLI overrides where it makes sense (e.g. overriding temperature). Will make this clearer in the next PR.

@princepride
Copy link
Collaborator

LGTM, Looking for your following PR.

@lishunyang12
Copy link
Contributor Author

lishunyang12 commented Feb 23, 2026

@hsliuustc0106 @wuhang2014
Tested on 2x A100-SXM4-80GB, PyTorch 2.10.0+cu128, CUDA 12.8, vLLM 0.16.0.

Unit tests:

pytest tests/test_config_factory.py -v  →  30/30 PASSED

All bundled topologies:

Model Test Result
Qwen3-TTS (1.7B) --query-type CustomVoice PASSED
Qwen3-TTS (1.7B) --query-type VoiceDesign PASSED
Bagel (7B) --modality text2img --prompts "A cute cat" PASSED
Qwen2.5-Omni (7B) default PASSED
Qwen3-Omni (30B) default PASSED

New CLI override tests (Qwen3-Omni):

CLI args Result
--gpu-memory-utilization 0.85 --enforce-eager PASSED — args parsed correctly; per-stage engine override takes effect in [2/N]
--output-dir /tmp/pr1115_test_output PASSED — output redirected to specified dir

No regressions across any model type.

@hsliuustc0106
Copy link
Collaborator

resolve conflicts

type=str,
default=None,
help="Path to a stage configs file.",
help="Path to a stage configs file. If not specified, uses auto-detected Tier-1 topology.",
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need to tell the users to understand the definition of Tier-1/2?

from omegaconf import DictConfig, OmegaConf


def load_yaml_raw(path: str | Any) -> DictConfig:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

def load_yaml_raw(path: str) -> DictConfig:
    cfg = OmegaConf.load(path)
    if not isinstance(cfg, DictConfig):
        raise TypeError(f"Expected a DictConfig from {path}, but got {type(cfg)}")
    return cfg

load_yaml_raw: the function name is very strange to users, maybe change to load_yaml_to_config?

@@ -0,0 +1,44 @@
# Tier-1 Stage Topology for Qwen3-Omni-MoE
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

stage_topo change back to stage_config

final_output: true
final_output_type: image

connectors:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

connector should be added in cli or yaml?

shared_memory_connector:
name: SharedMemoryConnector

edges:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we still need this definition of "edge"

stages:
- stage_id: 0
model_stage: thinker
stage_type: llm
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what's the differenceof stage_type and worker_type? are they redundant?

stage_type: llm
input_sources: [] # Entry point - no upstream stages
worker_type: ar
scheduler_cls: vllm_omni.core.sched.omni_ar_scheduler.OmniARScheduler
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is the scheduler_cls still necessary if we have worker_type or stage_type?

scheduler_cls: vllm_omni.core.sched.omni_ar_scheduler.OmniARScheduler
final_output: true
final_output_type: text
is_comprehension: true
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what's this?

Copy link
Contributor

@wuhang2014 wuhang2014 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should try to introduce as little number of concepts as we can.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here is my proposal for concepts we introduce here.
Stage Topololy -> Model Pipeline: The concept of model pipeline is used to define a computation pipeline of a specific model which is composed of several stages.
Tier-1 Config -> Model Pipeline Config: We should not introduce Tier to users or developers which make confusion for different concepts that not orthogonal. Currently we see Model Pipeline Config as not configurable by user.
Tier-2 Config -> vLLM-Omni Config: We should make all configs in scope of vLLM-Omni Config. vLLM-Omni Config includes configs inherent from vLLM, e.g. Parallel Config for parallelism, and also includes configs new based on vLLM-Omni specific scenarios, e.g. Diffusion Config for diffusion models.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. Renamed StageTopology -> ModelPipeline, validate_topology() -> validate_pipeline(), dropped all Tier-1/Tier-2 terminology throughout. Latest commit: 15ee3d3.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here is my proposal for coorperation of configs with start-up CLI to maintain compatibility with both single stage CLI and all stage in one CLI:

  1. There is a Path for placing config files after refactoring, it may have one value from the following two:

    • Path=vllm_omni/model_executor/model_pipelines
    • Path=vllm_omni/model_pipelines I prefer this one because it is shorter, and configs in it have no strong corelation with model_executor
  2. There is a file for Model Pipeline Config in each dir named by model:

    • {Path}/<model_name>/pipeline.yaml
  3. For compatibility of All stage in one CLI, we introduce a new file under each dir to store default startup args of CLI and device info:

    • {Path}/<model_name>/default_args.yaml

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

An example of bagel:

# In {Path}/bagel/pipeline.yaml
model_type: bagel

stages:
  - stage_id: 0
    model_stage: thinker
    stage_type: llm
    input_sources: []  # Entry point - no upstream stages
    worker_type: ar
    scheduler_cls: vllm_omni.core.sched.omni_ar_scheduler.OmniARScheduler
    final_output: true
    final_output_type: text
    is_comprehension: true

  - stage_id: 1
    model_stage: dit
    stage_type: diffusion
    input_sources: [0]  # Receives from thinker
    final_output: true
    final_output_type: image
# In {Path}/bagel/default_args.yaml

stage_args:
  - stage_id: 0
    runtime:
      devices: "0"
      max_batch_size: 1
    engine_args:
      model_stage: thinker
      model_arch: BagelForConditionalGeneration
      worker_type: ar
      scheduler_cls: vllm_omni.core.sched.omni_ar_scheduler.OmniARScheduler
      gpu_memory_utilization: 0.35
      enforce_eager: true
      trust_remote_code: true
      engine_output_type: text
      distributed_executor_backend: "mp"
      enable_prefix_caching: false
      max_num_batched_tokens: 32768
      tensor_parallel_size: 1
      omni_kv_config:
        need_send_cache: true
        kv_transfer_criteria:
          type: prefill_finished #or special token generated
    default_sampling_params:
      temperature: 0.4
      top_p: 0.9
      top_k: 1
      max_tokens: 2048
      seed: 52
      detokenize: True
      repetition_penalty: 1.05

  - stage_id: 1
    runtime:
      devices: "0"
      max_batch_size: 1
    engine_args:
      model_stage: dit
      gpu_memory_utilization: 0.55
      enforce_eager: true
      trust_remote_code: true
      engine_output_type: image
      distributed_executor_backend: "mp"
      enable_prefix_caching: false
      max_num_batched_tokens: 32768
      tensor_parallel_size: 1
      omni_kv_config:
        need_recv_cache: true
    default_sampling_params:
      seed: 52

# Runtime edges
runtime:
  # Distributed connectors configuration (optional)
  # More connectors will be supported in the future.
  connectors:
    shared_memory_connector:
      name: SharedMemoryConnector
      extra:
        shm_threshold_bytes: 65536 # 64KB threshold

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Adopted your proposal exactly. Went with \ (the shorter path). Each model now has \ + \ co-located in its own directory. The old \ directory is deleted; \ is kept as a legacy fallback for platform-specific configs.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Implemented exactly this layout for all 4 models. Thanks for the detailed example -- it made the migration straightforward.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we use stages/stage_args for different yaml?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we use stages/stage_args for different yaml?

I am not quite sure of your questions. Can you elaborate on that? An example would make me understand better.

@lishunyang12 lishunyang12 changed the title [Config Refactor][1/N] Two-Tier Stage Configuration System [Config Refactor][1/N] Model Pipeline Configuration System Feb 25, 2026
# Default runtime/engine args for Bagel
# Users can override these via CLI flags or a custom stage_configs file.

stage_args:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in my opinion, the arg names of pipeline.yaml should be a subset of default_args, please be careful with the arg names

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that is the arrangement for the current status. What is your concern now?

Signed-off-by: lishunyang <lishunyang12@163.com>
Signed-off-by: Jensen <czjourney@163.com>
Signed-off-by: Jiangyun Zhu <zhu.jiangyun@foxmail.com>
Signed-off-by: Canlin Guo <canlinguosdu@gmail.com>
Signed-off-by: wuhang <wuhang6@huawei.com>
Signed-off-by: Hongsheng Liu <liuhongsheng4@huawei.com>
Signed-off-by: Chenguang Zheng <fake0fan@users.noreply.github.com>
Signed-off-by: lishunyang <lishunyang12@163.com>
Signed-off-by: lishunyang <lishunyang12@163.com>
- Rename StageTopology → ModelPipeline
- Move stage_topologies/ → model_pipelines/<model>/{pipeline,default_args}.yaml
- Rename load_yaml_raw → load_yaml_config
- Drop Tier-1/Tier-2 terminology from code and docs
- Fix bare OmegaConf.create() → create_config()

Signed-off-by: lishunyang12 <lishunyang12@gmail.com>
Signed-off-by: lishunyang <lishunyang12@163.com>
Signed-off-by: lishunyang12 <lishunyang12@gmail.com>
Signed-off-by: lishunyang <lishunyang12@163.com>
…te_code

- Use re.match(r"stage_\d+_") instead of startswith("stage_") to avoid
  silently dropping future CLI args like stage_init_timeout
- Return None instead of [] from create_from_model when no pipeline found
- Thread trust_remote_code through _auto_detect_model_type instead of
  hardcoding True

Signed-off-by: lishunyang12 <lishunyang12@gmail.com>
Signed-off-by: lishunyang <lishunyang12@163.com>
@lishunyang12
Copy link
Contributor Author

@hsliuustc0106 Thanks for the thorough review. Addressed 1-6, pushed in 81e0218:

  1. Stale comment in end2end.py — Fixed in 3b3d610, that comment is gone now.

  2. Fragile stage_ prefix filter — Changed to re.match(r"stage_\d+_", key) so it only matches actual per-stage overrides. Good catch.

  3. Per-stage CLI overrides not wired — Correct, argparse entries come in [2/N] when we wire the factory into engine startup. The _merge_cli_overrides logic is ready for it.

  4. hf_config_name: thinker_config on code2wav — This is intentional for qwen3_omni_moe. The code2wav stage reuses the thinker's HF config to access shared vocab/tokenizer config. Qwen2.5-omni doesn't need it because its code2wav stage loads config differently. I'll add a comment in the YAML to clarify.

  5. Empty list sentinel — Changed create_from_model to return None instead of []. Return type is now list[StageConfig] | None.

  6. Hardcoded trust_remote_code=True — Now threaded through from create_from_model_load_pipeline_auto_detect_model_type. Defaults to the value from CLI overrides.

  7. get_final_stage_id_for_e2e behavior change — Fair point. The initialization change is functionally equivalent (the loop always finds a match for valid pipelines), but I'll call it out in the PR description.

Signed-off-by: lishunyang <lishunyang12@163.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

8 participants